Source code for hysop.numerics.fft.fft

# Copyright (c) HySoP 2011-2024
#
# This file is part of HySoP software.
# See "https://particle_methods.gricad-pages.univ-grenoble-alpes.fr/hysop-doc/"
# for further info.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


"""
Base interface for fast Fourier Transforms.

:class:`~hysop.numerics.fft.FFTI`
:class:`~hysop.numerics.fft.FFTPlanI`
:class:`~hysop.numerics.fft.FFTQueueI`

:class:`~hysop.numerics.fft.HysopFFTWarning`
:class:`~hysop.numerics.fft.HysopFFTDataLayoutError`

Methods defined here:
  simd_alignment, is_byte_aligned, mk_shape, mk_view
"""

from abc import ABCMeta, abstractmethod
import warnings
import functools
import numpy as np
from pyfftw import simd_alignment, is_byte_aligned as is_simd_byte_aligned

from hysop import vprint, __VERBOSE__
from hysop.constants import Backend, TransformType
from hysop.tools.htypes import first_not_None, check_instance
from hysop.tools.numerics import float_to_complex_dtype, complex_to_float_dtype
from hysop.tools.units import bytes2str
from hysop.tools.warning import HysopWarning
from hysop.tools.spectral_utils import SpectralTransformUtils as STU
from hysop.core.arrays.array import Array
from hysop.core.arrays.array_backend import ArrayBackend
from hysop.testsenv import __HAS_OPENCL_BACKEND__


[docs] def is_byte_aligned(array): from hysop.backend.host.host_array import HostArray if __HAS_OPENCL_BACKEND__: from hysop.backend.device.opencl.opencl_array import OpenClArray, clArray if isinstance(array, (HostArray, np.ndarray)): return is_simd_byte_aligned(array) elif __HAS_OPENCL_BACKEND__ and isinstance(array, (OpenClArray, clArray.Array)): return array.offset == 0 else: msg = f"Unknown array type {type(array)}." raise TypeError(msg)
[docs] def mk_shape(base_shape, axis, N): """ Utility function to create a modified shape from base_shape on a specified axis. """ shape = list(base_shape) shape[axis] = N return tuple(shape)
[docs] def mk_view(ndim, axis, *args, **kwds): """ Utility function to create a view on a n-dimensional array. Returns a tuple containing default axe view (ellipsis) on all axis but the one specified by 'axis' which is replaced by slice(*args) if len(args)>1 else args[0]. """ default = kwds.pop("default", slice(None, None, None)) assert args, "Need at least one arg !" assert not kwds, f"Unknown keyword arguments: {kwds.keys()}" view = [default] * ndim if len(args) == 1: view[axis] = args[0] else: view[axis] = slice(*args) return tuple(view)
[docs] class HysopFFTWarning(HysopWarning): """ Specific tag to issue FFT related warnings. """ pass
[docs] class HysopFFTDataLayoutError(ValueError): """ Specific error to raise for incompatible strides. """ pass
[docs] class FFTQueueI: """Command queue like objects to define n-dimensional transforms."""
[docs] @abstractmethod def execute(self, wait_for=None): """Execute all planned plans.""" pass
[docs] @abstractmethod def __iadd__(self, *plans): """Add a plan to the queue.""" pass
[docs] def __call__(self, **kwds): """Alias for execute.""" return self.execute(**kwds)
[docs] class FFTPlanI(metaclass=ABCMeta): """ Common inteface for FFT plans. Basically just a functor that holds relevant data to execute a preconfigurated FFT-like tranform. """ def __init__(self, verbose=__VERBOSE__): self.verbose = verbose self._setup = False self._allocated = False
[docs] @abstractmethod def input_array(self): """ Return currently planned input array. """ pass
[docs] @abstractmethod def output_array(self): """ Return currently planned output array. """ pass
[docs] def setup(self, queue=None): """ Method that has to be called before any call to execute. """ if self._setup: msg = "Plan was already setup..." raise RuntimeError(msg) self._setup = True return self
@property def required_buffer_size(self): """ Return the required temporary buffer size in bytes to compute the transform. """ assert self._setup return 0
[docs] def allocate(self, buf=None): """Provide or allocate required temporary buffer.""" assert self._setup assert not self._allocated self._allocated = True
[docs] @abstractmethod def execute(self): """ Execute the FFT plan on current input and output array. """ pass
[docs] def __call__(self, a=None, out=None, **kwds): """ Apply the FFT plan to possibly new input or output arrays. """ if (a is not None) or (out is not None): msg = "New array execute is not available for FFT backend {}." msg = msg.format(type(self).__name__) raise RuntimeError(msg) self.execute(**kwds)
[docs] class FFTI(metaclass=ABCMeta): """ Interface to compute local to process FFT-like transforms. Common inteface for all array backends, based on the numpy.fft interface. Standard FFTs: complex to complex (C2C) fft() Compute the 1-dimensional discrete Fourier Transform. ifft() Compute the 1-dimensional inverse discrete Fourier Transform. Real data FFTS: real to complex (R2C) and complex to real (C2R) rfft() Compute the 1-dimensional discrete Fourier Transform for real input. irfft() Compute the inverse of the 1-dimensional FFT of real input. Real FFTS: real to real (R2R) dct() Compute one of the discrete cosine transforms of a real input. dst() Compute one of the discrete sine transforms of a real input. Supported R2R transforms are at least: DCT-I, DCT-II, DCT-III DST-I, DST-II, DST-III Other R2R transforms: DCT-IV and DCT-IV are only supported by the FFTW backend at this time. DCT-V to DCT-VIII and DST-V to DST-VII are not supported by any FFT backend. About floating point precision: By default, both simple and double precision are supported. numpy only supports double precision (simple precision is supported by casting). FFTW also supports long double precision. Normalization: The default normalization has the direct transforms unscaled and the inverse transform is scaled by 1/N where N is the logical size of the transform. N should not to be confused with the physical size of the input arrays n: FFT, RFFT: N = n DCT-II, DCT-III, DCT-IV: N = 2*n DST-II, DST-III, DST-IV: N = 2*n DCT-I: N = 2*(n-1) DST-I: N = 2*(n+1) Orthogonal normalization is not supported by default, however the user may specify its custom normalization for each transform via the 'scaling' keyword parameter. Inverse transforms (up to scaling): Just add i in front of the method name to get the inverse transform with good scaling. For a given transform T, iT(T(X)) should always yield X within machine accuracy. Underlying inverse transform mapping is: FFT: IFFT RFFT: IRFFT DCT-I: DCT-I DCT-II: DCT-III DCT-III: DCT-II DCT-IV: DCT-IV DST-I: DST-I DST-II: DST-III DST-III: DST-II DST-IV: DST-IV Other methods that this interface defines: *Create queue *Transpose *Copy *Zero fill Those methods will be used by the n-dimensional planner. """ __transform2fn = { TransformType.FFT: ("fft", {}), TransformType.IFFT: ("ifft", {}), TransformType.RFFT: ("rfft", {}), TransformType.IRFFT: ("irfft", {}), TransformType.DCT_I: ("dct", {"type": 1}), TransformType.DCT_II: ("dct", {"type": 2}), TransformType.DCT_III: ("dct", {"type": 3}), TransformType.DCT_IV: ("dct", {"type": 4}), TransformType.IDCT_I: ("idct", {"type": 1}), TransformType.IDCT_II: ("idct", {"type": 2}), TransformType.IDCT_III: ("idct", {"type": 3}), TransformType.IDCT_IV: ("idct", {"type": 4}), TransformType.DST_I: ("dst", {"type": 1}), TransformType.DST_II: ("dst", {"type": 2}), TransformType.DST_III: ("dst", {"type": 3}), TransformType.DST_IV: ("dst", {"type": 4}), TransformType.IDST_I: ("idst", {"type": 1}), TransformType.IDST_II: ("idst", {"type": 2}), TransformType.IDST_III: ("idst", {"type": 3}), TransformType.IDST_IV: ("idst", {"type": 4}), }
[docs] @classmethod def default_interface_from_backend( cls, backend, enable_opencl_host_buffer_mapping, **kwds ): check_instance(backend, ArrayBackend) if backend.kind is Backend.HOST: from hysop.numerics.fft.host_fft import HostFFTI assert not enable_opencl_host_buffer_mapping return HostFFTI.default_interface(**kwds) elif backend.kind is Backend.OPENCL: if enable_opencl_host_buffer_mapping: from hysop.numerics.fft.host_fft import HostFFTI return HostFFTI.default_interface( backend=backend.host_array_backend, **kwds ) else: from hysop.numerics.fft.opencl_fft import OpenClFFTI return OpenClFFTI.default_interface(cl_env=backend.cl_env, **kwds) else: msg = f"Unknown backend kind {backend.kind}."
[docs] def check_backend(self, backend, enable_opencl_host_buffer_mapping): check_instance(backend, ArrayBackend) if enable_opencl_host_buffer_mapping: if self.backend is not backend.host_array_backend: msg = "Host array backend mismatch {} vs {}." msg = msg.format(self.backend, backend) raise RuntimeError(msg) else: if self.backend is not backend: msg = "Backend mismatch {} vs {}." msg = msg.format(self.backend, backend) raise RuntimeError(msg)
[docs] def get_transform(self, transform): check_instance(transform, TransformType) if transform not in self.__transform2fn: msg = f"Unknown transform type {transform}." raise NotImplementedError(transform) (fname, fkwds) = self.__transform2fn[transform] fn = getattr(self, fname) if fkwds: fn = functools.partial(fn, **fkwds) return fn
def __init__(self, backend, warn_on_allocation=True, error_on_allocation=False): """Initializes the interface and default supported real and complex types.""" from hysop.core.arrays.array_backend import ArrayBackend check_instance(backend, ArrayBackend) check_instance(warn_on_allocation, bool) check_instance(error_on_allocation, bool) self.supported_ftypes = (np.float32, np.float64) self.supported_ctypes = (np.complex64, np.complex128) self.supported_cosine_transforms = (1, 2, 3) self.supported_sine_transforms = (1, 2, 3) self.backend = backend self.warn_on_allocation = warn_on_allocation self.error_on_allocation = error_on_allocation
[docs] def allocate_output(self, out, shape, dtype): """Alocate output if required and check shape and dtype.""" if out is None: if self.warn_on_allocation or self.error_on_allocation: nbytes = np.prod(shape, dtype=np.int64) * dtype.itemsize msg = "FftwFFT: allocating aligned output array of size {}." msg = msg.format(bytes2str(nbytes)) if self.error_on_allocation: raise RuntimeError(msg) else: warnings.warn(msg, HysopFFTWarning) out = self.backend.empty(shape=shape, dtype=dtype) else: assert out.dtype == dtype assert out.shape == shape return out
[docs] @classmethod def default_interface(cls, **kwds): """Get the default FFT interface.""" msg = "{}.default_interface() has not been implemented yet !" msg = msg.format(cls.__name__) raise NotImplementedError(msg)
[docs] def allocate_plans(self, op, plans, tmp_buffer=None): """Allocate and share a buffer on given backend to a group of plans.""" backend = self.backend tmp_size = max(plan.required_buffer_size for plan in plans) if tmp_size > 0: msg = "Operator {}: Allocating an additional {} temporary buffer for FFT backend {}." msg = msg.format( op.pretty_name, bytes2str(tmp_size), self.__class__.__name__ ) if tmp_buffer is not None: assert tmp_buffer.nbytes >= tmp_size else: if self.error_on_allocation: raise RuntimeError(msg) elif self.warn_on_allocation: from .gpyfft_fft import HysopGpyFftWarning warnings.warn(msg, HysopGpyFftWarning) else: vprint(msg) tmp_buffer = backend.empty(shape=(tmp_size), dtype=np.uint8) for plan in plans: if plan.required_buffer_size > tmp_buffer.nbytes: msg = ( "\nFATAL ERROR: Failed to allocate temporary buffer for clFFT." ) msg += "\n => clFFT expected {} bytes but only {} bytes have been " msg += "allocated.\n" msg = msg.format(plan.required_buffer_size, tmp_buffer.nbytes) raise RuntimeError(msg) elif plan.required_buffer_size > 0: buf = tmp_buffer[: plan.required_buffer_size] plan.allocate(buf=buf) else: plan.allocate() else: for plan in plans: assert plan.required_buffer_size == 0 plan.allocate() tmp_buffer = None return tmp_buffer
[docs] @abstractmethod def fft(self, a, out, axis=-1, **kwds): """ Compute the unscaled one-dimensional complex to complex discrete Fourier Transform. Parameters ---------- a: array_like of np.complex64 or np.complex128 Complex input array. out: array_like of np.complex64 or np.complex128 Complex output array of the same shape and dtype as the input. axis: int, optional Axis over witch to compute the FFT. Defaults to last axis. Returns ------- (shape, dtype) of the output array determined from the input array. Notes ----- N = a.shape[axis] out[0] will contain the sum of the signal (zero-frequency term always real for real inputs). If N is even: out[1:N/2] contains the positive frequency terms. out[N/2] contains the Nyquist frequency (always real for real inputs). out[N/2+1:] contains the negative frequency terms. Else if N is odd: out[1:(N-1)/2] contains the positive frequency terms. out[(N-1)/2:] contains the negative frequency terms. """ assert a.dtype in self.supported_ctypes, a.dtype if out is not None: assert a.dtype == out.dtype assert np.array_equal(a.shape, out.shape) return (a.shape, a.dtype)
[docs] @abstractmethod def ifft(self, a, out, axis=-1, **kwds): """ Compute the one-dimensional complex to complex discrete Fourier Transform scaled by 1/N. Parameters ---------- a: array_like of np.complex64 or np.complex128 Complex input array. out: array_like of np.complex64 or np.complex128 Complex output array of the same shape and dtype as the input. axis: int, optional Axis over witch to compute the FFT. Defaults to last axis. Returns ------- (shape, dtype, logical_size) of the output array determined from the input array. """ assert a.dtype in self.supported_ctypes, a.dtype if out is not None: assert a.dtype == out.dtype assert np.array_equal(a.shape, out.shape) return (a.shape, a.dtype, a.shape[axis])
[docs] @abstractmethod def rfft(self, a, out, axis=-1, **kwds): """ Compute the unscaled one-dimensional real to hermitian complex discrete Fourier Transform. Parameters ---------- a: array_like of np.float32 or np.float64 Real input array. out: array_like of np.complex64 or np.complex128 Complex output array of matching complex dtype. out.shape[...] = a.shape[...] out.shape[axis] = a.shape[axis]//2 + 1 axis: int, optional Axis over witch to compute the transform. Defaults to last axis. Returns ------- (shape, dtype) of the output array determined from the input array. Notes ----- For real inputs there is no information in the negative frequency components that is not already available from the positive frequency component because of the Hermitian symmetry. N = out.shape[axis] = a.shape[axis]//2 + 1 out[0] will contain the sum of the signal (zero-frequency term, always real). If N is even: out[1:N/2] contains the positive frequency terms. out[N/2] contains the Nyquist frequency (always real). Else if N is odd: out[1:(N+1)/2] contains the positive frequency terms. """ assert a.dtype in self.supported_ftypes ctype = float_to_complex_dtype(a.dtype) cshape = list(a.shape) cshape[axis] = cshape[axis] // 2 + 1 cshape = tuple(cshape) if out is not None: assert out.dtype in self.supported_ctypes assert ctype == out.dtype assert np.array_equal(out.shape, cshape) return (cshape, ctype)
[docs] @abstractmethod def irfft(self, a, out, n=None, axis=-1, **kwds): """ Compute the one-dimensional hermitian complex to real discrete Fourier Transform scaled by 1/N. Parameters ---------- a: array_like of np.complex64 or np.complex128 Complex input array. out: array_like of np.float32 or np.float64 Real output array of matching real type. out.shape[...] = a.shape[...] Last axis should match forward transform size used: 1) out.shape[axis] = 2*(a.shape[axis]-1) 2) out.shape[axis] = 2*(a.shape[axis]-1) + 1 n: int, optional Length of the transformed axis of the output. ie: n should be in [2*(a.shape[axis]-1), 2*(a.shape[axis]-1)+1] axis: int, optional Axis over witch to compute the transform. Defaults to last axis. Notes ----- To get an odd number of output points, n or out must be specified. Returns ------- (shape, dtype, logical_size) of the output array determined from the input array, out and n. """ assert a.dtype in self.supported_ctypes cshape = a.shape rtype = complex_to_float_dtype(a.dtype) rshape_even, rshape_odd = list(a.shape), list(a.shape) rshape_even[axis] = 2 * (cshape[axis] - 1) rshape_odd[axis] = 2 * (cshape[axis] - 1) + 1 if out is not None: assert out.dtype in self.supported_ftypes assert rtype == out.dtype ns = out.shape[axis] if n is not None: assert ( ns == n ), "output shape mismatch with specified transformed output size." else: n = ns if n % 2 == 0: assert np.array_equal(out.shape, rshape_even) else: assert np.array_equal(out.shape, rshape_odd) if (n is None) or (n % 2 == 0): rshape = rshape_even n = rshape[axis] else: rshape = rshape_odd rshape = tuple(rshape) logical_size = n assert rshape[axis] == logical_size return (rshape, rtype, logical_size)
[docs] @abstractmethod def dct(self, a, out=None, type=2, axis=-1, **kwds): """ Compute the one-dimensional Cosine Transform of specified type. Parameters ---------- a: array_like Real input array. out: array_like Real output array of matching input type and shape. axis: int, optional Axis over witch to compute the transform. Defaults to last axis. Returns ------- (shape, dtype) of the output array determined from the input array. """ assert a.dtype in self.supported_ftypes, a.dtype assert ( type in self.supported_cosine_transforms ), self.supported_cosine_transforms if out is not None: assert a.dtype == out.dtype assert np.array_equal(a.shape, out.shape) return (a.shape, a.dtype)
[docs] @abstractmethod def idct(self, a, out=None, type=2, axis=-1, **kwds): """ Compute the one-dimensional Inverse Cosine Transform of specified type. Default scaling is 1/(2*N) for IDCT type (2,3,4) and 1/(2*N-2) for IDCT type 1. Parameters ---------- a: array_like Real input array. out: array_like Real output array of matching input type and shape. axis: int, optional Axis over witch to compute the transform. Defaults to last axis. Returns ------- (shape, dtype, inverse_type, logical_size) of the output array determined from the input array. """ itype = [1, 3, 2, 4][type - 1] n = a.shape[axis] N = 2 * (n - (itype == 1)) logical_size = N assert a.dtype in self.supported_ftypes, a.dtype assert ( itype in self.supported_cosine_transforms ), self.supported_cosine_transforms if out is not None: assert a.dtype == out.dtype assert np.array_equal(a.shape, out.shape) return (a.shape, a.dtype, itype, logical_size)
[docs] @abstractmethod def dst(self, a, out=None, type=2, axis=-1, **kwds): """ Compute the one-dimensional Sine Transform of specified type. Parameters ---------- a: array_like Real input array. out: array_like Real output array of matching input type and shape. axis: int, optional Axis over witch to compute the transform. Defaults to last axis. Returns ------- (shape, dtype) of the output array determined from the input array. """ assert a.dtype in self.supported_ftypes, a.dtype assert type in self.supported_sine_transforms, self.supported_sine_transforms if out is not None: assert a.dtype == out.dtype assert np.array_equal(a.shape, out.shape) return (a.shape, a.dtype)
[docs] @abstractmethod def idst(self, a, out=None, type=2, axis=-1, **kwds): """ Compute the one-dimensional Inverse Sine Transform of specified type. Default scaling is 1/(2*N) for IDST type (2,3,4) and 1/(2*N+2) for IDST type 1. Parameters ---------- a: array_like Real input array. out: array_like Real output array of matching input type and shape. axis: int, optional Axis over witch to compute the transform. Defaults to last axis. Returns ------- (shape, dtype, inverse_type, logical_size) of the output array determined from the input array. """ itype = [1, 3, 2, 4][type - 1] n = a.shape[axis] N = 2 * (n + (itype == 1)) logical_size = N assert a.dtype in self.supported_ftypes, a.dtype assert type in self.supported_sine_transforms, self.supported_sine_transforms if out is not None: assert a.dtype == out.dtype assert np.array_equal(a.shape, out.shape) return (a.shape, a.dtype, itype, logical_size)
[docs] @abstractmethod def new_queue(self, tg, name): """Return a FFTQueue object valid with this backend.""" pass
[docs] @abstractmethod def plan_copy(self, tg, src, dst): """Plan a copy from src to dst.""" pass
[docs] @abstractmethod def plan_accumulate(self, tg, src, dst): """Plan an accumulation from src into dst.""" pass
[docs] @abstractmethod def plan_transpose(self, tg, src, dst, axes): """Plan a transpose from src to dst using given axes.""" pass
[docs] @abstractmethod def plan_fill_zeros(self, tg, a, slices): """Plan to fill every input slices of input array a with zeroes.""" pass
[docs] @abstractmethod def plan_compute_energy(self, tg, fshape, src, dst, transforms, mutexes=None): """Plan to compute energy from src to energy.""" assert src.ndim == len(transforms) assert dst.ndim == 1 N = tuple(int(_) for _ in src.shape) K2 = () NS2 = () C2C = () R2C = 0 S = 1.0 assert len(fshape) == len(N) == len(transforms) for Fi, Ni, Ti in zip(fshape, N, transforms): c2c = int(STU.is_C2C(Ti)) r2c = int(STU.is_R2C(Ti) or STU.is_C2R(Ti)) Ki = Ni // 2 if c2c else Ni - 1 if r2c: Si = Fi / 2.0 else: Si = Fi S *= Si K2 += (Ki**2,) NS2 += ((Ni + 1) // 2,) C2C += (c2c,) R2C |= r2c S = 1 / S # for C2C we need to check j = (i<(N+1)//2 ? i : N-i) max_wavenumber = int(round(sum(K2) ** 0.5, 0)) msg = f"Destination buffer should have size {max_wavenumber+1} but has size {dst.size}." assert dst.size == max_wavenumber + 1, msg return (N, NS2, C2C, R2C, S)